package cn.doitedu.dw.pre

import java.util

import ch.hsr.geohash.GeoHash
import cn.doitedu.commons.util.SparkUtil
import cn.doitedu.dw.beans.AppLogBean
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{Dataset, Row}


/**
  * @date: 2020/1/12
  * @site: www.doitedu.cn
  * @author: hunter.d 涛哥
  * @qq: 657270652
  * @description: app埋点日志预处理
  */
object AppLogDataPreprocess {


  def main(args: Array[String]): Unit = {
    if(args.length<5){
      println(
        """
          |help
          |请输入正确的参数：
          |1. app埋点日志原始文件输入路径
          |2. geohash地理位置字典输入路径
          |3. 当日的idmp映射字典所在路径
          |4. 预处理结果输出路径
          |5. spark运行模式的master
          |
        """.stripMargin)
      sys.exit(1)
    }
    // 构造sparksessiong
    val spark = SparkUtil.getSparkSession(this.getClass.getSimpleName, args(4))
    import spark.implicits._


    // 加载当日的app埋点日志文件，成为一个dataset[String]
    //val appDs: Dataset[String] = spark.read.textFile("G:\\yiee_logs\\2020-01-12\\app")
    val appDs: Dataset[String] = spark.read.textFile(args(0))

    // 加载geo地域字典数据
    /**
      * -----|---------|------|----------|
      * geo  |province |  city|  district|
      * -----|---------|------|----------|
      * 39eu |河北省    | 石家庄| 裕华区    | Row
      * y67u |河南省    | 郑州市| 金水区    | Row
      */
    //val geodf = spark.read.parquet("data/dict/geo_dict/output")
    val geodf = spark.read.parquet(args(1))
    val geoMap: collection.Map[String, (String, String, String)] = geodf.rdd.map(row => {
      val geo = row.getAs[String]("geo")
      val province = row.getAs[String]("province")
      val city = row.getAs[String]("city")
      val district = row.getAs[String]("district")
      (geo, (province, city, district))
    }).collectAsMap()
    // 广播地域字典
    // Map{ 39eu -> (河北省,石家庄,裕华区)
    //      y67u -> (河南省,郑州市,金水区)
    //    }
    val bc_geo = spark.sparkContext.broadcast(geoMap)


    // 加载id映射字典
    /**
      * --------------|------|
      * biaoshi_hashcode |  guid|
      * --------------|------|
      * 8238574359   | 62375|Row
      * --------------|------|
      * 3285943259   | 62375|Row
      * --------------|------|
      * 62375   | 62375|Row
      * ----------------|------|
      */
    //val idmpdf = spark.read.parquet("data/idmp/2020-01-12")
    val idmpdf = spark.read.parquet(args(2))
    val idMap = idmpdf.rdd.map(row => {
      val id = row.getAs[Long]("biaoshi_hashcode")
      val guid = row.getAs[Long]("guid")

      (id, guid)
    }).collectAsMap()
    val bc_id = spark.sparkContext.broadcast(idMap)


    // 对日志ds集合中的每一条记录（json）进行解析
    val beans = appDs.map(line => {
      var bean: AppLogBean = null

      try {
        val jsonobj = JSON.parseObject(line)
        val eventid = jsonobj.getString("eventid")
        val timestamp = jsonobj.getString("timestamp").toLong

        val eventobj: JSONObject = jsonobj.getJSONObject("event")
        import scala.collection.JavaConversions._
        val javaMap: util.Map[String, String] = eventobj.getInnerMap.asInstanceOf[util.Map[String, String]]
        val event: Map[String, String] = javaMap.toMap

        val userobj = jsonobj.getJSONObject("user")
        val uid = userobj.getString("uid")
        val sessionId = userobj.getString("sessionId")

        val phoneobj = userobj.getJSONObject("phone")
        val imei = phoneobj.getString("imei")
        val mac = phoneobj.getString("mac")
        val imsi = phoneobj.getString("imsi")
        val osName = phoneobj.getString("osName")
        val osVer = phoneobj.getString("osVer")
        val androidId = phoneobj.getString("androidId")
        val resolution = phoneobj.getString("resolution")
        val deviceType = phoneobj.getString("deviceType")
        val deviceId = phoneobj.getString("deviceId")
        val uuid = phoneobj.getString("uuid")


        val appobj = userobj.getJSONObject("app")
        val appid = appobj.getString("appid")
        val appVer = appobj.getString("appVer")
        val release_ch = appobj.getString("release_ch") // 下载渠道
        val promotion_ch = appobj.getString("promotion_ch") // 推广渠道

        val locobj = userobj.getJSONObject("loc")

        var lng = 0.0
        var lat = -90.0

        try {
          lng = locobj.getDouble("longtitude")
          lat = locobj.getDouble("latitude")
        } catch {
          case e: Exception => e.printStackTrace()
        }

        val carrier = locobj.getString("carrier")
        val netType = locobj.getString("netType")
        val cid_sn = locobj.getString("cid_sn")
        val ip = locobj.getString("ip")

        // 判断数据合法规则
        val tmp = (imei + imsi + mac + uid + uuid + androidId).replaceAll("null", "")
        if (StringUtils.isNotBlank(tmp) && event != null && StringUtils.isNotBlank(eventid) && StringUtils.isNotBlank(sessionId)) {
          // 将提取出来的各个字段，封装到AppLogBean中
          bean = AppLogBean(
            Long.MinValue,
            eventid,
            event,
            uid,
            imei,
            mac,
            imsi,
            osName,
            osVer,
            androidId,
            resolution,
            deviceType,
            deviceId,
            uuid,
            appid,
            appVer,
            release_ch,
            promotion_ch,
            lng,
            lat,
            carrier,
            netType,
            cid_sn,
            ip,
            sessionId,
            timestamp
          )
        }

      } catch {
        case e: Exception => {
          e.printStackTrace()
        }
      }

      bean
    })


    val beans2 = beans.filter(_ != null)
    beans2.map(bean => {

      val geoDict = bc_geo.value
      val idmpDict = bc_id.value

      // 查geo地域字典，填充省市区
      val lat = bean.latitude
      val lng = bean.longtitude

      val mygeo = GeoHash.geoHashStringWithCharacterPrecision(lat, lng, 5)
      val maybeTuple: Option[(String, String, String)] = geoDict.get(mygeo)
      if (maybeTuple.isDefined) {
        val areaNames = maybeTuple.get
        // 填充省市区
        bean.province = areaNames._1
        bean.city = areaNames._2
        bean.district = areaNames._3

      }

      // 查id映射字典，填充guid
      val ids = Array(bean.imei, bean.imsi, bean.mac, bean.androidId, bean.uuid, bean.uid)
      val mouId = ids.filter(StringUtils.isNotBlank(_))(0)
      val maybeLong = idmpDict.get(mouId.hashCode.toLong)
      if (maybeLong.isDefined) {
        val guid = maybeLong.get
        bean.guid = guid
      }

      bean
    })
      .filter(bean => bean.guid != Long.MinValue)
      .toDF()
      .write
      //.parquet("data/applog_processed/2020-01-12")
      .parquet(args(3))


    spark.close()

  }

}
